#include <sys/types.h>
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <numa.h>
#include <stdlib.h>
#include "param.h"
#include "types.h"
#include "defs.h"

extern mempool pool;
uint64 input_fin, output_fin, sort_fin;
extern int in_fd, out_fd;
extern int numa_opt;
extern thread threads[];

int
thread_init () {
    uint64 i;
    sort_fin = 0;
    input_fin = 0;
    output_fin = 0;
    for (i = 0; i < pool.buf_num; i++) {
        if (pool.bufs[i].stat == B_UNUSED)
            pool.bufs[i].stat = B_EMPTY;
    }
    return 0;
}

#define CHANGE_STAT(s0,s1,s2,code) {        \
ready = 0;                                  \
pthread_mutex_lock (&(pool.bufs[i].mtx));   \
if (pool.bufs[i].stat == s0) {              \
    pool.bufs[i].stat = s1;                 \
    ready = 1;                              \
    mbuf = &(pool.bufs[i]);}                \
pthread_mutex_unlock (&(pool.bufs[i].mtx)); \
if (ready) {                                \
    code;                                   \
    if (pool.bufs[i].stat == s1) {          \
        pool.bufs[i].stat = s2;             \
}}}                                         \

static void 
load () {
uint64 cnt, ready, i, numa_ready;
membuf *mbuf;
//printf ("try to load...\n");
int numa_cnt[NUMA_NODES];
if (!input_fin) {
    cnt = 0;
    ready = 0;
    numa_ready = 0;
    for (i = 0; i < NUMA_NODES; i++)
        numa_cnt[i] = 0;
    for (i = 0; i < pool.buf_num; i++) {
        pthread_mutex_lock (&(pool.bufs[i].mtx));
        if (pool.bufs[i].stat == B_EMPTY) {
            cnt++;
            numa_cnt[pool.bufs[i].numa_id]++; 
        }
        pthread_mutex_unlock (&(pool.bufs[i].mtx));
    }
    for (i = 0; i < NUMA_NODES; i++)
        if (numa_cnt[i] >= NUMA_IO_THR)
            numa_ready = 1;
    // printf ("EMPTY_CNT:%ld\n", cnt);
    if (cnt >= IO_THR || (numa_ready&&numa_opt)) {
    do {
        /* EMPTY->LOADED， cnt个 */
        if (input_fin) break;
        for (i = 0; i < pool.buf_num; i++) {
            CHANGE_STAT (B_EMPTY, B_LOADING, B_LOADED, ({
                uint64 need = mbuf->size;
                int n;
                while (need > 0) {
                    if ((n = read (in_fd, mbuf->buf, need)) == -1) {
                    perror ("Unable to read");
                    mbuf->stat = B_BROKEN;
                    printf ("INPUT: Finish reading.\n");
                    input_fin = 1;
                    break;
                    }
                    if (n == 0) {
                        input_fin = 1;
                        printf ("INPUT: Finish reading...\n");
                        mbuf->stat = B_EMPTY;
                        break;
                    }
                    need -= n;
                }
                cnt--;
            }));
            if (cnt == 0 || input_fin) break;
        }
    } while (cnt > 0);
    }
}
}

static void
store () {
uint64 cnt, ready, i, numa_ready;
membuf *mbuf;
int temp;
int numa_cnt[NUMA_NODES];
//printf ("try to write...\n");
if (!output_fin) {
    cnt = 0;
    ready = 0;
    temp = 1;
    for (i = 0; i < NUMA_NODES; i++)
        numa_cnt[i] = 0;
    for (i = 0; i < pool.buf_num; i++) {
        pthread_mutex_lock (&(pool.bufs[i].mtx));
        if (pool.bufs[i].stat == B_SORTED) {
            cnt++;
            numa_cnt[pool.bufs[i].numa_id]++; 
        }
        if (pool.bufs[i].stat > B_EMPTY && pool.bufs[i].stat <= B_WRITING)
            temp = 0;
        pthread_mutex_unlock (&(pool.bufs[i].mtx));
    }
    for (i = 0; i < NUMA_NODES; i++)
        if (numa_cnt[i] >= NUMA_IO_THR)
            numa_ready = 1;
    output_fin = temp;
    //printf ("SORTED_CNT:%ld\n", cnt);
    if (cnt >= IO_THR || input_fin || (numa_ready&&numa_opt)) {
        do {
            if (output_fin) {
                printf ("OUTPUT: Finish writing...\n");
                break;
            }
            /* SORTED->EMPTY, cnt个 */
            for (i = 0; i < pool.buf_num; i++) {
            CHANGE_STAT (B_SORTED, B_WRITING, B_EMPTY, ({
                char * filename;
                filename = get_rand_name ();
                if ((out_fd = open (filename, O_CREAT | O_TRUNC | O_RDWR,
                                                S_IRUSR | S_IWUSR)) == -1) {
                    perror ("Unable to create file");
                    output_fin = 1;
                    break;
                }
                free (filename);
                uint64 need = mbuf->size;
                int n;
                while (need > 0) {
                    if ((n = write (out_fd, mbuf->buf, need)) == -1) {
                    perror ("Unable to write");
                    mbuf->stat = B_BROKEN;
                    output_fin = 1;
                    break;
                    }
                    need -= n;
                    if (n == 0) {
                        fprintf (stderr, "OUTPUT: Insufficient disk space.\n");
                    }
                }
                close (out_fd);
                //printf ("OUTPUT_CNT:%ld\n" ,cnt);
                cnt--;
            }));
            if (cnt == 0) break;
        }
        } while (cnt > 0);
    }
}
}

void
io_thread () {
    while (!input_fin || !output_fin) {
        sleep(1);
        load ();
        store ();
    } 
}

void
sort_thread (void *id_p) {
    uint64 ready, i;
    membuf *mbuf;
    uint64 numa_id;
    int id;
    id = *(int*)id_p;
    printf ("Sort thread %d started.\n", id);
    if (numa_opt) {
        //numa_sched_setaffinity (getpid (), get_cpu_mask (id));
        numa_id = get_current_numa_id (id);
        numa_run_on_node (numa_id);
    }
    for (;;) {
        usleep (1000);
        if (sort_fin) break;
        if (input_fin && output_fin) {
            sort_fin = 1;
            break;
        }
        for (i = 0; i < pool.buf_num; i++) {
            if (numa_opt && pool.bufs[i].numa_id != numa_id)
                continue;
            CHANGE_STAT (B_LOADED, B_SORTING, B_SORTED, ({
                #ifdef USE_QSORT
                    quick_sort (mbuf->buf, mbuf->size / sizeof (uint64));
                #else
                    #ifdef NO_CACHE
                        heap_sort_no_cache (mbuf->buf, mbuf->size / sizeof (uint64),
                                        threads[id].retarder0, threads[id].retarder1);
                    #else
                        heap_sort (mbuf->buf, mbuf->size / sizeof (uint64));
                    #endif
                #endif
            }));
        }
    }
}

#define glue(x,y) x ## y

void
monitor_thread () {
    uint64 i;
    for (;;) {
        if (input_fin && output_fin) {
            printf ("MONITOR: Process finished.\n");
            break;
        }
        for (i = 0; i < pool.buf_num; i++) {
            if (i % 4 == 0)
                printf ("\n");
            #define DISPLAY(x) { \
                if (pool.bufs[i].stat == glue(B_,x))\
                printf ("BUF %ld: %s\t", i, #x);\
            }
            DISPLAY (UNUSED);
            DISPLAY (EMPTY);
            DISPLAY (LOADING);
            DISPLAY (LOADED);
            DISPLAY (SORTING);
            DISPLAY (SORTED);
            DISPLAY (WRITING);
            DISPLAY (BROKEN);
            #undef DISPLAY
        }
        printf ("\n");
        sleep (1);
    }
    
}